package com.atguigu.flink.datastreamapi.agg;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/11
 */
public class Demo5_ProcessAgg
{
    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                    .global()
                    //聚合所有传感器的vc
                    .process(new ProcessFunction<WaterSensor, String>()
                    {

                        //当前processTask会绑定一个ProcessFunction对象，这个函数对象的sumVc被当前Task收到的所有的value所共享
                        private int sumVc;

                        /*
                                                                        processElement(): 来一条数据就调用一次。

                                                                            WaterSensor value: 当前到达的数据
                                                                            Context ctx： 环境。上下文。从这个对象中获取非常丰富的信息。
                                                                            Collector<WaterSensor> out： 输出

                                                                         */
                        @Override
                        public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                            sumVc += value.getVc();
                            out.collect("所有传感器的vc之和:"+ sumVc);
                        }
                    })//.setParallelism(1)
                    .print();

                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
    }
}
